Amazon Athenaのクエリ実行処理をシンプルにできる「Athena-Query」を使ってみた
こんにちは、CX事業本部 IoT事業部の若槻です。
Athena-Queryは、Amazon Athenaのクエリ実行開始およびクエリ結果取得のシンプルな記述を可能にするライブラリです。
以前までは次の記事のようにクエリ実行開始後に完了を待機するポーリング処理を設ける必要があり処理が煩雑になっていたのですが、Athena-Queryを使えばその必要も無くなります。
実はAthena-Queryは弊社のCX事業本部でやまたつさんを中心に開発され、昨年12月にOSSとして公開されたものとなっています。
試してみた
Athena-Queryを実際に使ってみます。と言っても使い方はとてもシンプルです。
インストール
npm install @classmethod/athena-query @aws-sdk/client-athena
ローカルで実行してみる
athenaQuery.query()
でAsyncGeneratorが返るので、for await...of
でイテレートします。
import { Athena } from '@aws-sdk/client-athena'; import AthenaQuery from '@classmethod/athena-query'; const athena = new Athena({}); const athenaQuery = new AthenaQuery(athena); export const handler = async () => { for await (const item of athenaQuery.query('SELECT * FROM device_table;')) { console.log(item); } }; handler();
実行すると、クエリ実行結果が取得できました。
$ npx tsx athena-query.ts { amount: 1, deviceid: 'あああ' } { amount: 15, deviceid: 'device_01' }
オプション
AthenaQuery
の初期化時に、クエリで使用されるAthena Workgroup、Glue Database、Glue Catalogをオプションで指定できます。指定しない場合は下記のデフォルト値となります。
//デフォルト値 const athenaQuery = new AthenaQuery(athena, { db: 'default', workgroup: 'primary', catalog: 'AwsDataCatalog', });
取得されるレコードの最大数およびParaeterized Queryも指定できます。
const resultGen = athenaQuery.query( ` SELECT * FROM waf_logs WHERE name = ? AND groupId = ? AND score > ?; `, { executionParameters: ["test", 123, 456n], maxResults: 100, } );
AWS Lambda上で実行してみる
Athena-QueryのコードをAWS Lambda上で実行してみます。
import { Athena } from '@aws-sdk/client-athena'; import AthenaQuery from '@classmethod/athena-query'; const athena = new Athena({}); const athenaQuery = new AthenaQuery(athena, { db: 'default', workgroup: 'primary', catalog: 'AwsDataCatalog', }); export const handler = async () => { const items = []; for await (const item of athenaQuery.query('SELECT * FROM device_table;')) { items.push(item); } return items; };
デプロイはCDKで行います。GlueやAthena周りのリソースは別途作成済み、権限付与は簡略化しています。
import { Construct } from 'constructs'; import { aws_lambda_nodejs, aws_iam, Stack, StackProps } from 'aws-cdk-lib'; export class CdkSampleStack extends Stack { constructor(scope: Construct, id: string, props: StackProps) { super(scope, id, props); const queryDevicesFunc = new aws_lambda_nodejs.NodejsFunction( this, 'queryDevicesFunc' ); queryDevicesFunc.addToRolePolicy( new aws_iam.PolicyStatement({ actions: ['athena:*', 'glue:*', 'athena:*'], resources: ['*'], }) ); } }
デプロイを実施。
npx cdk deploy "*" --require-approval never
Lambdaで実行できました。
[ { "amount": 1, "deviceid": "あああ" }, { "amount": 15, "deviceid": "device_01" } ]
おわりに
Amazon Athenaのクエリ実行処理をシンプルにできる「Athena-Query」を使ってみました。
実は以前に同様のことを実現できるライブラリとして「Athena-Express」を試してみたのですが、Lambda上で動かすことができず仕舞いだったので、Athena-Queryの登場は個人的にとても嬉しいものでした。
参考
以上